Source code for hysop.core.graph.graph

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import inspect
import networkx
from hysop import dprint
from hysop.constants import MemoryOrdering
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.decorators import debug, wraps, profile


[docs] def is_directed_acyclic_graph(graph): return networkx.algorithms.dag.is_directed_acyclic_graph(graph)
[docs] def transitive_reduction(graph): reduced_graph = networkx.algorithms.dag.transitive_reduction(graph) # copy back edge attributes (node data is automatically transferred # because nodes are the data (VertexAttributes)) for node in reduced_graph: for out_node in reduced_graph[node]: for k, v in graph[node][out_node].items(): reduced_graph[node][out_node][k] = v return reduced_graph
[docs] def all_simple_paths(graph, src, dst): return tuple(networkx.algorithms.simple_paths.all_simple_paths(graph, src, dst))
[docs] def lexicographical_topological_sort(graph): # Lexicographical sort ensures a unique permutations of nodes # such that they are in the same topological order on each # MPI process. Else operators will not be executed in the same # order and everything deadlocks on MPI synchronization. topo_sort = tuple( networkx.algorithms.dag.lexicographical_topological_sort( graph, key=lambda x: int(x) ) ) return topo_sort
[docs] def new_directed_graph(): return networkx.DiGraph()
[docs] def new_vertex(graph, *args, **kwds): # /!\ We have to use networkx 2.2 which has a different interface for attributes node = VertexAttributes(graph, *args, **kwds) graph.add_node(node) return node
[docs] def new_edge(graph, u, v, *args, **kwds): # /!\ We have to use networkx 2.2 which has a different interface for attributes assert u in graph assert v in graph if v not in graph[u]: data = EdgeAttributes(*args, **kwds) graph.add_edge(u, v, data=data) else: edge = graph[u][v] edge["data"].update(*args, **kwds) return (u, v)
[docs] def generate_vertex_colors(): try: import matplotlib except ImportError: return None c0 = matplotlib.colormaps["tab20c"].colors c1 = matplotlib.colormaps["tab20b"].colors colors = [] for i in (2, 3, 0, 1): colors += c0[i::4] + c1[i::4] colors = tuple(map(matplotlib.colors.to_hex, colors)) return colors
[docs] class VertexAttributes: """Simple class to hold vertex data.""" colors = generate_vertex_colors() def __init__(self, graph, operator=None): if not hasattr(graph, "_hysop_node_counter"): graph._hysop_node_counter = 0 node_id = graph._hysop_node_counter graph._hysop_node_counter += 1 self.node_id = node_id self.operator = operator self.input_states = None self.output_states = None self.op_ordering = None self.command_queue = None
[docs] def copy_attributes(self, other): if other is None: return self check_instance(other, VertexAttributes) for vname in ( "operator", "input_states", "output_states", "op_ordering", "command_queue", ): setattr( self, vname, first_not_None(getattr(self, vname), getattr(other, vname)) ) return self
[docs] def set_op_info(self, operator, input_states, output_states): assert self.operator is not None assert self.operator is operator self.operator = operator self.input_states = input_states self.output_states = output_states return self
# hashing for networkx def __hash__(self): return self.node_id def __eq__(self, other): return self.node_id == other.node_id def __int__(self): return self.node_id # pyvis attributes for display @property def label(self): s = f"{self.operator.pretty_name}" if self.op_ordering is not None: s = f"({self.op_ordering})\n{s}" return s @property def title(self): return self.node_info().replace("\n", "<br>")
[docs] def shape(self, with_custom_nodes=True): from hysop.operator.base.transpose_operator import TransposeOperatorBase from hysop.operator.base.redistribute_operator import RedistributeOperatorBase from hysop.operator.base.memory_reordering import MemoryReorderingBase special_shapes = { RedistributeOperatorBase: "box", TransposeOperatorBase: "box", MemoryReorderingBase: "box", } if with_custom_nodes: for op_type, shape in special_shapes.items(): if isinstance(self.operator, op_type): return shape return "circle"
@property def color(self): cq = self.command_queue if cq is None: return None assert isinstance(cq, int) and cq >= 0 colors = self.colors ncolors = len(colors) return colors[cq % ncolors]
[docs] def node_info(self): op = self.operator istates = self.input_states ostates = self.output_states ifields = op.input_fields ofields = op.output_fields iparams = op.input_params oparams = op.output_params memorder2str = { MemoryOrdering.C_CONTIGUOUS: "C", MemoryOrdering.F_CONTIGUOUS: "F", } def ifinfo(field, topo): info = (field.pretty_name, topo.id) if istates: assert field in istates istate = istates[field] assert istate is not None info += (memorder2str[istate.memory_order],) info += (str(istate.tstate),) return ", ".join(map(str, info)) def ofinfo(field, topo): info = (field.pretty_name, topo.id) if ostates: assert field in ostates ostate = ostates[field] assert ostate is not None info += (memorder2str[ostate.memory_order],) info += (str(ostate.tstate),) return ", ".join(map(str, info)) def ipinfo(param): return param.pretty_name def opinfo(param): return param.pretty_name prefix = "&nbsp;&nbsp<b>" suffix = "</b>&nbsp;&nbsp" sep = "\n" + "&nbsp" * 14 ss = "<h2>Operator {}</h2>{}{}{}{}{}\n{}".format( op.name, f"{prefix}Rank:{suffix}{self.op_ordering}\n\n" if self.op_ordering else "", ( "{p}Pin:{s}{}\n".format( sep.join(ipinfo(param) for param in iparams.keys()), p=prefix, s=suffix + "&nbsp&nbsp", ) if iparams else "" ), ( "{p}Fin:{s}{}\n".format( sep.join([ifinfo(f, topo) for (f, topo) in ifields.items()]), p=prefix, s=suffix + "&nbsp&nbsp", ) if ifields else "" ), ( "{p}Pout:{s}{}\n".format( sep.join([opinfo(param) for param in oparams.keys()]), p=prefix, s=suffix, ) if oparams else "" ), ( "{p}Fout:{s}{}\n".format( sep.join([ofinfo(f, topo) for (f, topo) in ofields.items()]), p=prefix, s=suffix, ) if ofields else "" ), "{p}Type:{s} {}".format( sep.join(map(lambda x: x.__name__, type(op).__mro__[:-2])), p=prefix, s=suffix, ), ) return ss
[docs] class EdgeAttributes: """Simple class to hold edge data.""" def __init__(self, *args, **kwds): self.variables = {} self.update(*args, **kwds)
[docs] def update(self, variable=None, topology=None): if variable is None: assert topology is None return self.variables.setdefault(variable, set()).add(topology)
def __str__(self): prefix = "&nbsp;&nbsp<b>" suffix = "</b>&nbsp;&nbsp" ss = "<h2>Variable dependencies</h2>{}".format( "\n".join( "{p}{}:{s}{}".format( v.pretty_name, ", ".join( v.pretty_name if (t is None) else v[t].short_description() for t in self.variables[v] ), p=prefix, s=suffix, ) for v in self.variables ) ) return ss.replace("\n", "<br>")
[docs] class ComputationalGraphNodeData: """ Simple class to hold some node data. """ def __init__(self, current_level, node_id): self.current_level = current_level self.node_id = node_id self.apply_kargs = [] # list of dictionnary, last one has priority def __str__(self): return f"(lvl={self.current_level},id={self.node_id})"
if __debug__: # python in debug mode, all decorators do check their target attribute def not_initialized(f): assert callable(f) @wraps(f) def _not_initialized(*args, **kargs): return f(*args, **kargs) self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if self.initialized: reason = "this node has already been initialized." raise RuntimeError(msg.format(reason)) return _not_initialized def initialized(f): assert callable(f) @wraps(f) def _initialized(*args, **kargs): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.initialized: reason = "this node has not been initialized yet." raise RuntimeError(msg.format(reason)) return f(*args, **kargs) return _initialized def discretized(f): assert callable(f) @wraps(f) def _discretized(*args, **kargs): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.discretized: reason = "this node has not been discretized yet." raise RuntimeError(msg.format(reason)) return f(*args, **kargs) return _discretized def ready(f): assert callable(f) @wraps(f) def _ready(*args, **kargs): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.ready: reason = "this node has not been set up." raise RuntimeError(msg.format(reason)) return f(*args, **kargs) return _ready def graph_built(f): assert callable(f) @wraps(f) def _graph_built(*args, **kargs): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.graph_built: reason = "the graph has not been built yet." raise RuntimeError(msg.format(reason)) return f(*args, **kargs) return _graph_built def generated(f): assert callable(f) @wraps(f) def _generated(*args, **kargs): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.generated: reason = "this node has not been generated yet." raise RuntimeError(msg.format(reason)) return f(*args, **kargs) return _generated else: # not __debug__ # python optimized, no checks
[docs] def not_initialized(f): return f
[docs] def initialized(f): return f
[docs] def discretized(f): return f
[docs] def ready(f): return f
[docs] def graph_built(f): return f
[docs] def generated(f): return f
[docs] def op_apply(f): @debug @profile @ready def apply(*args, **kwds): dbg = "dbg" in kwds dbg = dbg and (kwds["dbg"] is not None) dbg = dbg and (kwds["dbg"].enable_on_op_apply) debug_dump = "debug_dumper" in kwds debug_dump = debug_dump and (kwds["debug_dumper"] is not None) debug_dump = debug_dump and (kwds["debug_dumper"].enable_on_op_apply) op = args[0] if debug_dump: assert "simulation" in kwds simu = kwds["simulation"] it = simu.current_iteration t = simu.t() _file = inspect.getsourcefile(f) _, _line = inspect.getsourcelines(f) description = f"{_file}:{_line}" for param in sorted(op.input_params.keys(), key=lambda x: x.name): tag = f"pre_{op.name}_{param.name}" kwds["debug_dumper"]( it, t, tag, (param._value,), description=description ) for dfield in sorted( op.input_discrete_fields.values(), key=lambda x: x.name ): tag = f"pre_{op.name}_{dfield.name}" kwds["debug_dumper"]( it, t, tag, tuple( df.sdata.get().handle[df.compute_slices] for df in dfield.dfields ), description=description, ) ret = f(*args, **kwds) for param in sorted(op.output_params.keys(), key=lambda x: x.name): tag = f"post_{op.name}_{param.name}" kwds["debug_dumper"]( it, t, tag, (param._value,), description=description ) for dfield in sorted( op.output_discrete_fields.values(), key=lambda x: x.name ): tag = f"post_{op.name}_{dfield.name}" kwds["debug_dumper"]( it, t, tag, tuple( df.sdata.get().handle[df.compute_slices] for df in dfield.dfields ), description=description, ) return ret elif dbg: msg = inspect.getsourcefile(f) kwds["dbg"]("pre " + msg, nostack=True) ret = f(*args, **kwds) kwds["dbg"]("post " + msg, nostack=True) return ret else: return f(*args, **kwds) return ret return apply